-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Track parquet writer encoding memory usage on MemoryPool #11345
Track parquet writer encoding memory usage on MemoryPool #11345
Conversation
518a7e9
to
073e471
Compare
…n, before selecting shrinking for data bytes flushed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…arallelized call stack for col vs rg
Thanks @wiedld -- please mark this PR as ready for review when it is ready for another look |
// TODO: update error handling in ParquetSink | ||
"Unable to send array to writer!", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parallelized writes have vectors for channels, and vectors for the spawned tasks. This error we are hitting (on memory limit reached) is for the closed channel.
I believe we want to be surfacing errors for the tasks, which should exit due to the memory reservation error. Need to poke around a bit more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to update several map_err statements to propagate inner error messages rather than ignore them. E.g.
datafusion/datafusion/core/src/datasource/file_format/parquet.rs
Lines 880 to 884 in b96186f
col_array_channels[next_channel] | |
.send(c) | |
.await | |
.map_err(|_| { | |
DataFusionError::Internal("Unable to send array to writer!".into()) |
change to something like
col_array_channels[next_channel]
.send(c)
.await
.map_err(|e| internal_datafusion_err!("Unable to send array to writer due to error {e}"))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #11397 to track
FYI @devinjdangelo |
let (writer, col_reservation) = task.join_unwind().await?; | ||
let encoded_size = writer.get_estimated_total_bytes(); | ||
rg_reservation.grow(encoded_size); | ||
drop(col_reservation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the explicit drop
is needed -- it will be dropped automatically by the compiler when col_reservation
goes out of scope on the next line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wiedld -- this is looking quite close
46dc2d3
to
6ce964e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TODO: update error handling in ParquetSink | ||
"Unable to send array to writer!", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to update several map_err statements to propagate inner error messages rather than ignore them. E.g.
datafusion/datafusion/core/src/datasource/file_format/parquet.rs
Lines 880 to 884 in b96186f
col_array_channels[next_channel] | |
.send(c) | |
.await | |
.map_err(|_| { | |
DataFusionError::Internal("Unable to send array to writer!".into()) |
change to something like
col_array_channels[next_channel]
.send(c)
.await
.map_err(|e| internal_datafusion_err!("Unable to send array to writer due to error {e}"))
Thank you @devinjdangelo ! Note that the error message propagated back is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @wiedld and @devinjdangelo -- I think this looks good to me now
I agree with @devinjdangelo it would be great to improve the error message propagation -- but I also think we could do that as a follow on PR.
Edit: looks like @wiedld plans to do as a follow on PR so I'll file a ticket and merge this one
Follow on ticket: #11397
🚀 |
* feat(11344): track memory used for non-parallel writes * feat(11344): track memory usage during parallel writes * test(11344): create bounded stream for testing * test(11344): test ParquetSink memory reservation * feat(11344): track bytes in file writer * refactor(11344): tweak the ordering to add col bytes to rg_reservation, before selecting shrinking for data bytes flushed * refactor: move each col_reservation and rg_reservation to match the parallelized call stack for col vs rg * test(11344): add memory_limit enforcement test for parquet sink * chore: cleanup to remove unnecessary reservation management steps * fix: fix CI test failure due to file extension rename
* feat(11344): track memory used for non-parallel writes * feat(11344): track memory usage during parallel writes * test(11344): create bounded stream for testing * test(11344): test ParquetSink memory reservation * feat(11344): track bytes in file writer * refactor(11344): tweak the ordering to add col bytes to rg_reservation, before selecting shrinking for data bytes flushed * refactor: move each col_reservation and rg_reservation to match the parallelized call stack for col vs rg * test(11344): add memory_limit enforcement test for parquet sink * chore: cleanup to remove unnecessary reservation management steps * fix: fix CI test failure due to file extension rename
* feat(11344): track memory used for non-parallel writes * feat(11344): track memory usage during parallel writes * test(11344): create bounded stream for testing * test(11344): test ParquetSink memory reservation * feat(11344): track bytes in file writer * refactor(11344): tweak the ordering to add col bytes to rg_reservation, before selecting shrinking for data bytes flushed * refactor: move each col_reservation and rg_reservation to match the parallelized call stack for col vs rg * test(11344): add memory_limit enforcement test for parquet sink * chore: cleanup to remove unnecessary reservation management steps * fix: fix CI test failure due to file extension rename
* feat(11344): track memory used for non-parallel writes * feat(11344): track memory usage during parallel writes * test(11344): create bounded stream for testing * test(11344): test ParquetSink memory reservation * feat(11344): track bytes in file writer * refactor(11344): tweak the ordering to add col bytes to rg_reservation, before selecting shrinking for data bytes flushed * refactor: move each col_reservation and rg_reservation to match the parallelized call stack for col vs rg * test(11344): add memory_limit enforcement test for parquet sink * chore: cleanup to remove unnecessary reservation management steps * fix: fix CI test failure due to file extension rename
* feat(11344): track memory used for non-parallel writes * feat(11344): track memory usage during parallel writes * test(11344): create bounded stream for testing * test(11344): test ParquetSink memory reservation * feat(11344): track bytes in file writer * refactor(11344): tweak the ordering to add col bytes to rg_reservation, before selecting shrinking for data bytes flushed * refactor: move each col_reservation and rg_reservation to match the parallelized call stack for col vs rg * test(11344): add memory_limit enforcement test for parquet sink * chore: cleanup to remove unnecessary reservation management steps * fix: fix CI test failure due to file extension rename
* feat(11344): track memory used for non-parallel writes * feat(11344): track memory usage during parallel writes * test(11344): create bounded stream for testing * test(11344): test ParquetSink memory reservation * feat(11344): track bytes in file writer * refactor(11344): tweak the ordering to add col bytes to rg_reservation, before selecting shrinking for data bytes flushed * refactor: move each col_reservation and rg_reservation to match the parallelized call stack for col vs rg * test(11344): add memory_limit enforcement test for parquet sink * chore: cleanup to remove unnecessary reservation management steps * fix: fix CI test failure due to file extension rename
* feat(11344): track memory used for non-parallel writes * feat(11344): track memory usage during parallel writes * test(11344): create bounded stream for testing * test(11344): test ParquetSink memory reservation * feat(11344): track bytes in file writer * refactor(11344): tweak the ordering to add col bytes to rg_reservation, before selecting shrinking for data bytes flushed * refactor: move each col_reservation and rg_reservation to match the parallelized call stack for col vs rg * test(11344): add memory_limit enforcement test for parquet sink * chore: cleanup to remove unnecessary reservation management steps * fix: fix CI test failure due to file extension rename
Which issue does this PR close?
Closes #11344
Rationale for this change
ParquetSink can use non-trivial amounts of memory to buffer rowgroups prior to flush, when executed within a task context. Therefore, this memory usage should be accounted within the task's memory pool.
What changes are included in this PR?
Ensure memory accounting under three use cases for ParquetSink:
How parallelized-write memory tracking works, is summarized here. I feel like it should be a doc comment, but I wasn't sure where. 🤔
Are these changes tested?
Yes
Are there any user-facing changes?
No